-
Notifications
You must be signed in to change notification settings - Fork 2k
[Kernel-Spark] Fix startingVersion support on non re-creatable version by skipping start snapshot validation #5574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ad time This change fixes the startingVersion support in DSv2 streaming by: 1. Removing the requirement for initialSnapshot parameter 2. Using getActionsFromCommitFilesWithProtocolValidation to validate protocol for each commit at read time (similar to V1 behavior) 3. Extracting getActionsUnsafe utility method to StreamingHelper The key insight is that we don't need a pre-loaded snapshot for protocol validation - we can validate each commit's protocol as we read it, which matches V1's behavior and avoids the issue where initialSnapshot.version < startVersion. Test: DeltaSourceDSv2Suite startingVersion tests now pass
| return removeFile.getDataChange() ? Optional.of(removeFile) : Optional.empty(); | ||
| } | ||
|
|
||
| public static CloseableIterator<ColumnarBatch> getActionsFromRangeUnsafe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment explain in what way this is unsafe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| public SparkMicroBatchStream( | ||
| DeltaSnapshotManager snapshotManager, | ||
| Snapshot initialSnapshot, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't necessarily need to plumb in an initialSnapshot, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to get the table path and pass to ValidateCommit method. Let me just plumb the path instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after rebasing I found that we are trying to load the latest snapshot again. So we may want to plumb it so that we could get snapshot, table id and path without getting snapshot again
- Resolved conflicts in SparkMicroBatchStream.java and SparkMicroBatchStreamTest.java - Updated constructor calls to use tablePath parameter instead of initialSnapshot - Applied master's improvements (try-catch for CommitRangeNotFoundException, Java Optional usage)
zikangh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, Xin!
|
|
||
| public SparkScan( | ||
| DeltaSnapshotManager snapshotManager, | ||
| io.delta.kernel.Snapshot initialSnapshot, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to fully quality here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, let's avoid this
| String tablePath = startSnapshot.getPath(); | ||
| try (CloseableIterator<ColumnarBatch> actionsIter = | ||
| commitRange.getActions(engine, startSnapshot, ACTION_SET)) { | ||
| StreamingHelper.getActionsFromRangeUnsafe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment here documenting the exact use case for why we are calling getActionsFromRangeUnsafe?
Which Delta project/connector is this regarding?
Description
This PR fixes the
startingVersionsupport in DSv2 streaming by skipping start snapshot validation when callinggetActions.Problem
The Kernel API's
CommitRange.getActions()strictly requiredsnapshot.version == startVersion, which failed when:startingVersion="latest"and no new data exists (effective version =latestVersion + 1)Solution
Use low-level Kernel API: Call
DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation()directly instead ofCommitRange.getActions(), bypassing the strict snapshot version checkWhether we want to update kernel API to allow us bypass validation worth longer discussion. We just made this change in Dsv2 connector to make sure v1 behavior parity.
How was this patch tested?
Unit
Does this PR introduce any user-facing changes?
No